消息隊列 - RabbitMQ實作
陳柏仁/
2023-09-23
/166 views
今天,我們將使用nodejs來建立一個小程式,模擬服務之間透過RabbitMQ傳輸。
前言
今天,我們將使用 nodejs 來建立一個小程式,模擬服務之間透過 RabbitMQ 傳輸。預計會建立一個連線(connection),開兩個通道(channel),一個為傳送者,另一個為接收者,在建立連線後,透過 rabbitMQ 每隔一秒傳送"Hello world!"字串,並且印出來。
事前準備
在實作之前,必須先下載並安裝好以下軟體:
實作
安裝
RabbitMQ 支援多種協定。我們將使用 AMQP 0-9-1 協議來連線,請先下載 npm 套件 amqplib
npm install amqplib
建立連線
引入套件,並建立連線
連線需要 connection string,預設值為amqp://localhost:5672
。為了安全考量,在生產環境需加入帳號密碼,此時 connection string 格式為: amqp://${username}:${password}@${host}
,我用程式判斷並提取變數出來當環境變數,避免程式出現密碼明碼,程式碼如下:
const amqp = require("amqplib")
;(async () => {
const username = process.env.USER_NAME
const password = process.env.PASSWORD
const host = process.env.HOST || "localhost:5672"
try {
let connectionString = `amqp://${host}`
if (username && password) {
connectionString = `amqp://${username}:${password}@${host}`
}
const connection = await amqp.connect(connectionString)
console.log(`[RabbitMQ] Connected to ${host}`)
} catch (error) {
console.error(`[RabbitMQ] Error occured from RabbitMQ server: ${host}`)
console.error(error.stack)
}
})()
建立接收者
接著建立接收者,首先,建立第一個通道(channel1)及隊列(queue1)
const queue = "queue1"
const channel1 = await connection.createChannel()
await channel1.assertQueue(queue)
建立成功後,透過 consume 函式建立連結並監聽,當接收到訊息時印出來,回傳 ack 確認接收成功
channel1.consume(queue, message => {
if (message !== null) {
console.log(`Recieved message:`, message.content.toString())
channel1.ack(message)
}
})
建立傳送者
接著建立傳送者,一樣先建立另一個通道(channel2),設定定時器及透過 sendToQueue 函式,每隔一秒傳送"Hello world!"到隊列(queue1)
const channel2 = await connection.createChannel()
setInterval(() => {
const sendMessage = "Hello world!"
channel2.sendToQueue(queue, Buffer.from(sendMessage))
}, 1000)
完成並執行
經過整理後,完整程式碼會長這樣:
const amqp = require("amqplib")
;(async () => {
const username = process.env.USER_NAME
const password = process.env.PASSWORD
const host = process.env.HOST || "localhost:5672"
try {
let connectionString = `amqp://${host}`
if (username && password) {
connectionString = `amqp://${username}:${password}@${host}`
}
const connection = await amqp.connect(connectionString)
console.log(`[RabbitMQ] Connected to ${host}`)
const queue = "queue1"
const channel1 = await connection.createChannel()
await channel1.assertQueue(queue)
channel1.consume(queue, message => {
if (message !== null) {
console.log(`Recieved message:`, message.content.toString())
channel1.ack(message)
}
})
const channel2 = await connection.createChannel()
setInterval(() => {
const sendMessage = "Hello world!"
channel2.sendToQueue(queue, Buffer.from(sendMessage))
}, 1000)
} catch (error) {
console.error(`[RabbitMQ] Error occured from RabbitMQ server: ${host}`)
console.error(error.stack)
}
})()
執行程式,結果如下:
[RabbitMQ] Connected to localhost:5672
Received message: Hello world!
Received message: Hello world!
Received message: Hello world!
Received message: Hello world!
結論
今天透過一個簡單的小程式,實作了 RabbitMQ 的連線,然而,在實作上會將傳送者與接收者分開,一個服務可能就是一個傳送者/接收者,服務與服務之間會透過與 RabbitMQ 的互動來獲取資料。